package com.anji.plus.gaea.job.trigger.thread;

import com.anji.plus.gaea.job.trigger.config.TriggerConfiguration;
import com.anji.plus.gaea.job.core.dto.ReturnT;
import com.anji.plus.gaea.job.core.param.RegistryParam;
import com.anji.plus.gaea.job.trigger.dao.DaoService;
import com.anji.plus.gaea.job.trigger.dao.entity.MetaJobExecutor;
import com.anji.plus.gaea.job.core.util.ObjectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

/**
 * @desc 执行器心跳注册处理线程
 *
 * Created by xuxueli on 17/3/10.
 *
 * Borrowed from xxljob v2.4.0
 */
public class JobRegistryHelper {
    private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
    private static JobRegistryHelper instance = new JobRegistryHelper();
    public static JobRegistryHelper getInstance(){
        return instance;
    }

    //注册中心负责处理执行器http注册请求任务的线程池
    private ThreadPoolExecutor registryOrRemoveThreadPool = null;
    private Thread registryMonitorThread;
    private volatile boolean toStop = false;

    // 所有在线的executor的地址 key--applicationName value--地址集合[http://10.108.12.22:9097/]
    private volatile static Map<String, List<String>> onlineExecutorAddressMap = new ConcurrentHashMap<String, List<String>>();

    // 所有的executor的job列表,不管是否在线,只要注册过
    private volatile static Map<String, List<String>> executorJobHandlerMap = new ConcurrentHashMap<String, List<String>>();

    //启动注册中心线程池
    public void start(){
        // for registry or remove
        registryOrRemoveThreadPool = new ThreadPoolExecutor(
                2,
                10,
                30L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        r.run();
                        logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
                    }
                }
        );

        registryMonitorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        long currentMillis = System.currentTimeMillis();

                        List<MetaJobExecutor> executorList = DaoService.getInstance().queryMetaJobExecutorList();
                        if(executorList != null){
                            for(MetaJobExecutor metaJobExecutor: executorList){
                                Long expireTimemillis = metaJobExecutor.getExpireTimemillis();
                                // 心跳是有效的
                                if(expireTimemillis != null && expireTimemillis.longValue() > currentMillis){
                                    String appName = metaJobExecutor.getExecutorCode();
                                    List<String> registryList = onlineExecutorAddressMap.get(appName);
                                    if (registryList == null) {
                                        registryList = new ArrayList<String>();
                                    }
                                    if (!registryList.contains(metaJobExecutor.getExecutorAddress())) {
                                        registryList.add(metaJobExecutor.getExecutorAddress());
                                    }
                                    onlineExecutorAddressMap.put(appName, registryList);
                                } else {
                                    // 心跳无效
                                    DaoService.getInstance().registryDown(metaJobExecutor.getId());
                                    logger.debug("executor:{}-{} registry timeout, update available to invalid 0", metaJobExecutor.getExecutorCode(), metaJobExecutor.getExecutorAddress());
                                }
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(TriggerConfiguration.getInstance().getTriggerProperties().getBeatTimeoutSeconds());
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
            }
        });
        registryMonitorThread.setDaemon(true);
        registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
        registryMonitorThread.start();
    }

    public void stop(){
        toStop = true;

        // stop registryOrRemoveThreadPool
        registryOrRemoveThreadPool.shutdownNow();

        // stop monitir (interrupt and wait)
        registryMonitorThread.interrupt();
        try {
            registryMonitorThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }


    public ReturnT<String> registryUp(RegistryParam registryParam) {
        if(registryParam == null || registryParam.isValid() == false){
            return ReturnT.fail("Argument registryParam valid fail");
        }
        registryOrRemoveThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                // 更新表meta_job_executor
                //Date registoryTime = new Date();
                long expireTimemillis = System.currentTimeMillis() + TriggerConfiguration.getInstance().getTriggerProperties().getBeatDeadSeconds() * 1000;
                int count = DaoService.getInstance().registryUp(registryParam.getExecutorCode(), registryParam.getExecutorAddress(), expireTimemillis);
                if(count < 1){
                    DaoService.getInstance().registrySave(registryParam.getExecutorCode(), registryParam.getExecutorAddress(), expireTimemillis);
                }

                // 维护执行器的job列表
                String executorCode = registryParam.getExecutorCode();
                List<String> jogHandlerList = ObjectUtil.array2List(registryParam.getJobHandlerBeanNames());
                if(executorJobHandlerMap.containsKey(executorCode)){
                    jogHandlerList = ObjectUtil.mergerList(executorJobHandlerMap.get(executorCode), jogHandlerList);
                    executorJobHandlerMap.put(executorCode ,jogHandlerList);
                }else{
                    executorJobHandlerMap.put(executorCode, jogHandlerList);
                }
            }
        });
        return ReturnT.SUCCESS;
    }

    public ReturnT<String> registryDown(RegistryParam registryParam) {
        if(registryParam == null || registryParam.isValid() == false){
            return ReturnT.fail("Argument registryParam valid fail");
        }
        registryOrRemoveThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                String executorCode = registryParam.getExecutorCode();
                String executorAddress = registryParam.getExecutorAddress();
                int count = DaoService.getInstance().registryDown(executorCode, executorAddress);

                // 执行器停止时主动取消注册,更新调度器的执行器可用地址
                if(onlineExecutorAddressMap.containsKey(executorCode)){
                    onlineExecutorAddressMap.get(executorCode).remove(executorAddress);
                }
            }
        });
        return ReturnT.SUCCESS;
    }

    public static Map<String, List<String>> getOnlineExecutorAddressMap() {
        return onlineExecutorAddressMap;
    }

    /**
     * 返回指定的执行器的在线状态实例地址
     * @param executorCode 等同application.name
     * @return List<String>
     */
    public static List<String> getOnlineExecutorAddress(String executorCode){
        if(onlineExecutorAddressMap.containsKey(executorCode)){
            return onlineExecutorAddressMap.get(executorCode);
        }
        return new ArrayList<>();
    }
}
